OkHttp源码分析(二)

OkHttp的请求过程

在上一篇中我们大概了解了OkHttp大体的执行过程,本篇将进一步介绍OkHttp请求的具体过程,这设计到部分拦截器的具体细节。

####建立连接

关于内置的拦截器将会在其他篇章中分别做介绍,这里只介绍涉及到具体的Http请求过程的拦截器,它们是ConnectInterceptor和CallServerInterceptor分别负责连接的建立和Http请求的发起。我们先看ConnectInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;

public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}

@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();//得到一个StreamAllocation

// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);//分配一个Stream
RealConnection connection = streamAllocation.connection();//建立一个连接

return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
}

ConnectInterceptor的最终目的是和服务器建立连接,它首先从拦截器链中得到的StreamAllocation,但是拦截器链在初始时streamAllocation并未创建,那么它是何时创建,在哪里创建呢?可以确定的是它一定是在拦截器链ConnectInterceptor之前的拦截器中进行初始化的,通过寻找发现它是在RetryAndFollowUpInterceptor中创建的。

1
2
3
4
5
6
7
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();

streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
……
}

这个StreamAllocation到底是做什么的呢?我们看看它的具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* This class coordinates the relationship between three entities:
*
* <ul>
* <li><strong>Connections:</strong> physical socket connections to remote servers. These are
* potentially slow to establish so it is necessary to be able to cancel a connection
* currently being connected.
* <li><strong>Streams:</strong> logical HTTP request/response pairs that are layered on
* connections. Each connection has its own allocation limit, which defines how many
* concurrent streams that connection can carry. HTTP/1.x connections can carry 1 stream
* at a time, HTTP/2 typically carry multiple.
* <li><strong>Calls:</strong> a logical sequence of streams, typically an initial request and
* its follow up requests. We prefer to keep all streams of a single call on the same
* connection for better behavior and locality.
* </ul>
...
*/
public final class StreamAllocation {
public final Address address;
private Route route;
private final ConnectionPool connectionPool;
private final Object callStackTrace;

// State guarded by connectionPool.
private final RouteSelector routeSelector;
private int refusedStreamCount;
private RealConnection connection;
private boolean released;
private boolean canceled;
private HttpCodec codec;

public StreamAllocation(ConnectionPool connectionPool, Address address, Object callStackTrace) {
this.connectionPool = connectionPool;
this.address = address;
this.routeSelector = new RouteSelector(address, routeDatabase());
this.callStackTrace = callStackTrace;
}
……
}

从注释中我们可以了解到StreamAllocation的作用,它负责协调Connections,Streams以及Calls三者的关系,Connnections是代表了连接远程服务的物理socket连接,Streams是依赖于Connnections逻辑上的Http Request/Response请求对。每个connections都有它的分配上限,这个决定了connections可以并发支持的streams数目,Calls是streams逻辑上的序列,它实际上是一组Request。清楚了这些,再看其构造方法,其中ConnectionPool想必就是负责物理连接的,Address描述了我们需要连接的远端服务,实际上它是通过url来得到的。

接下来要为一次请求创建一个Stream,这个Stream是逻辑上的Request/Response对,通过StreamAllocation的newStream方法创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
int connectTimeout = client.connectTimeoutMillis();//超时连接时间
int readTimeout = client.readTimeoutMillis();//读超时时间
int writeTimeout = client.writeTimeoutMillis();//写超时时间
boolean connectionRetryEnabled = client.retryOnConnectionFailure();//是否支持重连

try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);//为Stream找到一个可用的Connection
HttpCodec resultCodec = resultConnection.newCodec(client, this);//取到HttpCodec

synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}

newStream方法为stream找到一个可用的连接,然后通过连接的newCodec得到一个HttpCodec,它负责对Http的Request进行编码以及对Response进行解码。接着再看findHealthyConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}

return candidate;
}
}

通过一个while循环直到找到一个health RealConnection.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {//对连接池加锁
...
// Attempt to use an already-allocated connection.
//重用已经分配的连接
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}

// Attempt to get a connection from the pool.
//尝试从连接池中获取一个连接
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}

selectedRoute = route;
}

// If we need a route, make one. This is a blocking operation.
//需要选择一个路由
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
}

RealConnection result;
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");

// Now that we have an IP address, make another attempt at getting a connection from the pool.
// This could match due to connection coalescing.
Internal.instance.get(connectionPool, address, this, selectedRoute);//从连接池中取到一个匹配路由的连接
if (connection != null) return connection;

// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);//创建一个连接
acquire(result);//保存在connection中
}

// Do TCP + TLS handshakes. This is a blocking operation.
//建立TCP和TCL的连接,这是一个阻塞操作
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
routeDatabase().connected(result.route());

Socket socket = null;
synchronized (connectionPool) {
// Pool the connection.
Internal.instance.put(connectionPool, result);//存放在连接池中

// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);

return result;//返回连接
}

findConnection负责从连接池中取出一个匹配的RealConnection,如果未找到就创建一个新的,并通过connect建立和远端服务的连接。这个是通过RealConnection的connect方法来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public final class RealConnection extends Http2Connection.Listener implements Connection {
private static final String NPE_THROW_WITH_NULL = "throw with null exception";
private final ConnectionPool connectionPool;
private final Route route;

// The fields below are initialized by connect() and never reassigned.

/** The low-level TCP socket. */
private Socket rawSocket;//底层 TCP的socket

/**
* The application layer socket. Either an {@link SSLSocket} layered over {@link #rawSocket}, or
* {@link #rawSocket} itself if this connection does not use SSL.
*/
private Socket socket;//应用层的socket要么是基于rawSocket之上的SSLSocket,要么是rawSocket自己,表示没有用到SSL
private Handshake handshake;
private Protocol protocol;
private Http2Connection http2Connection;
private BufferedSource source;
private BufferedSink sink;//sink和source是让这个Connection可以以流的形式和服务器进行交互
public boolean noNewStreams;//如果为true,则不能在该连接上创建新的Stream
public int allocationLimit = 1;//分配流的数量上限,默认为1

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public void connect(
int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
if (protocol != null) throw new IllegalStateException("already connected");

RouteException routeException = null;
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);

if (route.address().sslSocketFactory() == null) {
if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication to " + host + " not permitted by network security policy"));
}
}

while (true) {
try {
if (route.requiresTunnel()) {//需要建立隧道连接
connectTunnel(connectTimeout, readTimeout, writeTimeout);
} else {//创建Socket
connectSocket(connectTimeout, readTimeout);
}
establishProtocol(connectionSpecSelector);//建立协议
break;
} catch (IOException e) {
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;

if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}

if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}

if (http2Connection != null) {
synchronized (connectionPool) {
allocationLimit = http2Connection.maxConcurrentStreams();
}
}
}

connect负责连接的建立过程,这里通过while循环来创建连接,保证连接能够创建成功,在循环中首先根据requiresTunnel判读是否创建隧道连接,它需要满足

1
2
3
public boolean requiresTunnel() {
return address.sslSocketFactory != null && proxy.type() == Proxy.Type.HTTP;
}

即对于设置了HTTP代理,且安全的连接。这时候需要请求HTTP代理建立一个到目标HTTP服务器的隧道连接,客户端和HTTP代理建立TCP连接。这里我们只关心不设置代理的情况,即创建连接通过connectSocket来完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
 /** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */
private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();

rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);//创建socket

rawSocket.setSoTimeout(readTimeout);
try {
//建立连接,这里我们关注AndroidPlatform下的connectSocket
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}

// The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
// More details:
// https://github.com/square/okhttp/issues/3245
// https://android-review.googlesource.com/#/c/271775/
try {
//得到输入输出流
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}

这里我们看AndroidPlatform的connectSocket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override public void connectSocket(Socket socket, InetSocketAddress address,
int connectTimeout) throws IOException {
try {
socket.connect(address, connectTimeout);//创建socket TCP连接
} catch (AssertionError e) {
if (Util.isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
} catch (SecurityException e) {
// Before android 4.3, socket.connect could throw a SecurityException
// if opening a socket resulted in an EACCES error.
IOException ioException = new IOException("Exception in connect");
ioException.initCause(e);
throw ioException;
}
}

connectSocket实际就是通过创建的rawSocket来建立TCP连接,连接建立成功后,需要进一步建立协议,它主要为数据的加密传输做一些初始化,比如TLS握手,HTTP/2的协议协商等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 private void establishProtocol(ConnectionSpecSelector connectionSpecSelector) throws IOException {
if (route.address().sslSocketFactory() == null) {//http/1.1
protocol = Protocol.HTTP_1_1;
socket = rawSocket;
return;
}

connectTls(connectionSpecSelector);//连接到TLS

if (protocol == Protocol.HTTP_2) {
socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
http2Connection = new Http2Connection.Builder(true)
.socket(socket, route.address().url().host(), source, sink)
.listener(this)
.build();
http2Connection.start();
}
}

未配置sslScoketFactor就默认是HTTP/1.1,这时候socket和rawSocket是同一个,否则需要为安全请求建立TLS连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private void connectTls(ConnectionSpecSelector connectionSpecSelector) throws IOException {
Address address = route.address();
SSLSocketFactory sslSocketFactory = address.sslSocketFactory();
boolean success = false;
SSLSocket sslSocket = null;
try {
// Create the wrapper over the connected socket.
sslSocket = (SSLSocket) sslSocketFactory.createSocket(
rawSocket, address.url().host(), address.url().port(), true /* autoClose */);//基于rawSocket创建SSLSocket

// Configure the socket's ciphers, TLS versions, and extensions.
//配置SSLSocket
ConnectionSpec connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket);
if (connectionSpec.supportsTlsExtensions()) {
//配置扩展参数
Platform.get().configureTlsExtensions(
sslSocket, address.url().host(), address.protocols());
}

// Force handshake. This can throw!
sslSocket.startHandshake();//开始握手
Handshake unverifiedHandshake = Handshake.get(sslSocket.getSession());//获取握手信息

// Verify that the socket's certificates are acceptable for the target host.
//验证socket证书是否适用于目标主机
if (!address.hostnameVerifier().verify(address.url().host(), sslSocket.getSession())) {
X509Certificate cert = (X509Certificate) unverifiedHandshake.peerCertificates().get(0);
throw new SSLPeerUnverifiedException("Hostname " + address.url().host() + " not verified:"
+ "\n certificate: " + CertificatePinner.pin(cert)
+ "\n DN: " + cert.getSubjectDN().getName()
+ "\n subjectAltNames: " + OkHostnameVerifier.allSubjectAltNames(cert));
}

// Check that the certificate pinner is satisfied by the certificates presented
//验证证书pinner
address.certificatePinner().check(address.url().host(),
unverifiedHandshake.peerCertificates());

// Success! Save the handshake and the ALPN protocol.
String maybeProtocol = connectionSpec.supportsTlsExtensions()
? Platform.get().getSelectedProtocol(sslSocket)
: null;
socket = sslSocket;
//得到输入输出流
source = Okio.buffer(Okio.source(socket));
sink = Okio.buffer(Okio.sink(socket));
handshake = unverifiedHandshake;
protocol = maybeProtocol != null
? Protocol.get(maybeProtocol)
: Protocol.HTTP_1_1;
success = true;
} catch (AssertionError e) {
if (Util.isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
} finally {
if (sslSocket != null) {
Platform.get().afterHandshake(sslSocket);
}
if (!success) {
closeQuietly(sslSocket);
}
}
}

TLS连接是对原始的TCP连接的一个封装,以提供TLS握手,及数据收发过程中的加密解密等功能。它通过SSLSocket来描述,建立一个TLS连接的步骤为:

  1. 配置SSLSocket
  2. 配置扩展参数
  3. 开始握手
  4. 获取握手信息
  5. 验证证书信息
  6. 创建用于执行IO的BufferedSource和BufferedSink等,并保存握手信息及所选择的协议

建立好连接后,在StreamAllocation的newStream中接下来就是创建HttpCodec返回,HttpCodec负责Request和Response的编解码。这里我们看看它的创建过程

1
2
3
4
5
6
7
8
9
10
11
public HttpCodec newCodec(
OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {//对于HTTP/2.0
return new Http2Codec(client, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(client.readTimeoutMillis());
source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}

newCodec会根据HTTP的版本来创建Http2Codec或者Http1Codec对象。

###发起请求

建立好连接后,就需要发起http请求了,这个是在CallServerInterceptor这个拦截器中进行的,我们看看它的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();//获取HttpCodec,它用来描述一个http流
StreamAllocation streamAllocation = realChain.streamAllocation();//获取StreamAllocation
RealConnection connection = (RealConnection) realChain.connection();//获取到RealConnection,它代表了到服务器的连接
Request request = realChain.request();

long sentRequestMillis = System.currentTimeMillis();
httpCodec.writeRequestHeaders(request);//发起Http请求

Response.Builder responseBuilder = null;
//检测是否有请求Body
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return what
// we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
responseBuilder = httpCodec.readResponseHeaders(true);
}

if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
// being reused. Otherwise we're still obligated to transmit the request body to leave the
// connection in a consistent state.
streamAllocation.noNewStreams();
}
}

httpCodec.finishRequest();
//创建负责请求的Builder
if (responseBuilder == null) {
responseBuilder = httpCodec.readResponseHeaders(false);
}
//得到响应Response
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

int code = response.code();
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}

if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}

if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}

return response;
}

可以看到整个Http请求和响应的过程是通过HttpCodec来进行操作的,首先通过writeRequestHeaders发起http请求,然后创建Response.Builder对象,负责接收响应。

坚持原创技术分享,您的支持将鼓励我继续创作!